在前兩天我們完成了金流串接與 webhook 整合,系統已經可以自動處理付款並推播通知。但在真實環境中,網路會斷線、API 會 timeout、服務會掛掉。
如果 Worker 推播通知時 LINE API 暫時無法回應,我們該如何確保通知不會遺失?當系統出錯時,該如何快速發現並修復?
今天我們要建立一套完整的錯誤處理、重試機制與監控系統,讓訂單系統更加穩定可靠。
設計 Redis-based retry queue(失敗任務重試機制)
實作重試次數上限與 exponential backoff 策略
建立 dead-letter queue 記錄永遠失敗的任務
實作健康檢查端點 /healthz
整合錯誤通知(Sentry / LINE 群組報錯)
在分散式系統中,失敗是常態。以下是常見的錯誤情境:
LINE API 暫時無法回應
- Rate limit 超過(每秒最多 100 requests)
- LINE 服務維護中
- 網路暫時不穩定
資料庫連線失敗
- MongoDB 重啟
- 連線池滿載
- 網路延遲過高
Redis 連線中斷
- Redis 重啟
- 記憶體不足
- 網路問題
如果沒有重試機制,這些暫時性錯誤會導致通知永久遺失,造成客戶抱怨與信任流失。
我們將建立三層佇列來處理不同狀態的任務:

Main Queue:正常的通知任務
Retry Queue:處理失敗的任務,帶有重試次數與延遲時間
Dead Letter Queue (DLQ):超過重試上限的任務,需人工檢查
更新 src/queues/notify.js:
const { ensureConnected } = require("../lib/redis");
const QUEUE_KEY = "queue:notify:v1";
const RETRY_QUEUE_KEY = "queue:retry:v1";
const DLQ_KEY = "queue:dlq:v1";
const MAX_RETRY_COUNT = 3; // 最多重試 3 次
const RETRY_DELAYS = [5000, 15000, 60000]; // 5秒、15秒、60秒 (exponential backoff)
/**
* 將通知任務加入主佇列
*/
async function enqueueNotify(payload) {
const redis = await ensureConnected();
// 加入初始重試次數
const taskWithMeta = {
...payload,
retryCount: 0,
createdAt: new Date().toISOString(),
};
await redis.rPush(QUEUE_KEY, JSON.stringify(taskWithMeta));
console.log('✅ 任務已加入主佇列:', payload.type);
}
/**
* 將失敗任務加入重試佇列
* @param {Object} payload - 任務資料
* @param {Error} error - 錯誤資訊
*/
async function enqueueRetry(payload, error) {
const redis = await ensureConnected();
const retryCount = (payload.retryCount || 0) + 1;
// 超過重試上限,送到 DLQ
if (retryCount > MAX_RETRY_COUNT) {
console.error(`❌ 任務超過重試上限,移至 DLQ: ${payload.type}`);
await enqueueDLQ(payload, error);
return;
}
// 計算延遲時間(exponential backoff)
const delay = RETRY_DELAYS[retryCount - 1] || 60000;
const retryAt = Date.now() + delay;
const retryTask = {
...payload,
retryCount,
lastError: {
message: error.message,
stack: error.stack,
timestamp: new Date().toISOString(),
},
retryAt,
};
// 使用 ZADD 建立有序集合(sorted set),以 retryAt 作為 score
await redis.zAdd(RETRY_QUEUE_KEY, {
score: retryAt,
value: JSON.stringify(retryTask),
});
console.log(`🔄 任務加入重試佇列 (第 ${retryCount} 次,${delay}ms 後重試): ${payload.type}`);
}
/**
* 將永遠失敗的任務移至 Dead Letter Queue
*/
async function enqueueDLQ(payload, error) {
const redis = await ensureConnected();
const dlqTask = {
...payload,
failedAt: new Date().toISOString(),
finalError: {
message: error.message,
stack: error.stack,
},
};
await redis.rPush(DLQ_KEY, JSON.stringify(dlqTask));
console.error('💀 任務已移至 DLQ:', payload.type);
}
/**
* 從重試佇列取出到期的任務
* @returns {Array} 到期的任務列表
*/
async function getRetryTasks() {
const redis = await ensureConnected();
const now = Date.now();
// 使用 ZRANGEBYSCORE 取出 score <= now 的任務
const tasks = await redis.zRangeByScore(RETRY_QUEUE_KEY, 0, now);
if (tasks.length > 0) {
// 從 retry queue 移除已取出的任務
await redis.zRemRangeByScore(RETRY_QUEUE_KEY, 0, now);
console.log(`📥 從重試佇列取出 ${tasks.length} 個到期任務`);
}
return tasks.map(task => JSON.parse(task));
}
/**
* 取得 DLQ 中的所有任務(用於監控與人工處理)
*/
async function getDLQTasks() {
const redis = await ensureConnected();
const tasks = await redis.lRange(DLQ_KEY, 0, -1);
return tasks.map(task => JSON.parse(task));
}
/**
* 清除 DLQ 中的指定任務
*/
async function removeDLQTask(taskJson) {
const redis = await ensureConnected();
await redis.lRem(DLQ_KEY, 1, taskJson);
console.log('🗑️ 已從 DLQ 移除任務');
}
module.exports = {
QUEUE_KEY,
RETRY_QUEUE_KEY,
DLQ_KEY,
enqueueNotify,
enqueueRetry,
enqueueDLQ,
getRetryTasks,
getDLQTasks,
removeDLQTask,
};
重點說明:
Exponential Backoff:重試間隔逐漸增加(5s → 15s → 60s)
Sorted Set (ZADD):用時間戳記作為 score,實現延遲佇列
DLQ 設計:保留完整錯誤資訊,方便事後分析
Metadata 追蹤:記錄重試次數、錯誤訊息、時間戳記
更新 worker.js 或 src/queues/worker.js:
const { ensureConnected } = require("./lib/redis");
const {
QUEUE_KEY,
enqueueRetry,
getRetryTasks,
} = require("./queues/notify");
const line = require("@line/bot-sdk");
const client = new line.messagingApi.MessagingApiClient({
channelAccessToken: process.env.CHANNEL_ACCESS_TOKEN,
});
let isHealthy = true; // 健康狀態標記
/**
* 啟動 Worker:主佇列 + 重試佇列
*/
async function startWorker() {
const redis = await ensureConnected();
console.log('🚀 Worker 啟動,監聽 Redis 佇列...');
// 定期檢查重試佇列(每 5 秒)
setInterval(async () => {
try {
const retryTasks = await getRetryTasks();
for (const task of retryTasks) {
console.log(`🔄 重試任務: ${task.type} (第 ${task.retryCount} 次)`);
await processTask(task);
}
} catch (error) {
console.error('❌ 處理重試佇列失敗:', error);
isHealthy = false;
}
}, 5000);
// 主佇列監聽
while (true) {
try {
const result = await redis.blPop(QUEUE_KEY, 0);
if (!result) continue;
const task = JSON.parse(result.element);
await processTask(task);
isHealthy = true; // 成功處理,標記為健康
} catch (error) {
console.error('❌ Worker 處理失敗:', error);
isHealthy = false;
await new Promise(resolve => setTimeout(resolve, 3000));
}
}
}
/**
* 處理單一任務(包含錯誤處理與重試)
*/
async function processTask(task) {
try {
console.log('📥 收到任務:', task.type);
switch (task.type) {
case 'new_order':
await handleNewOrder(task);
break;
case 'payment_success':
await handlePaymentSuccess(task);
break;
default:
console.warn('⚠️ 未知的任務類型:', task.type);
}
console.log('✅ 任務處理完成:', task.type);
} catch (error) {
console.error(`❌ 任務處理失敗 (${task.type}):`, error.message);
// 將失敗任務加入重試佇列
await enqueueRetry(task, error);
}
}
async function handleNewOrder(payload) {
const { userId, orderId, items } = payload;
if (!userId) {
throw new Error('userId 不可為空');
}
const message = {
type: 'text',
text: `✅ 您的訂單已建立成功!\n訂單編號:${orderId}\n品項:${items.map(i => i.productName).join(', ')}\n\n請完成付款。`,
};
await client.pushMessage({ to: userId, messages: [message] });
console.log(`✅ 已推播新訂單通知給 ${userId}`);
}
async function handlePaymentSuccess(payload) {
const { userId, orderId, amount, tradeNo } = payload;
if (!userId) {
throw new Error('userId 不可為空');
}
const message = {
type: 'text',
text: `🎉 付款成功!\n\n訂單編號:${orderId}\n付款金額:NT$ ${amount}\n交易編號:${tradeNo}\n\n您的訂單正在處理中,稍後將為您送達!`,
};
await client.pushMessage({ to: userId, messages: [message] });
console.log(`✅ 已推播付款通知給 ${userId}`);
}
/**
* 取得 Worker 健康狀態(供 healthz 端點使用)
*/
function getHealthStatus() {
return isHealthy;
}
startWorker();
module.exports = { getHealthStatus };
重點說明:
雙佇列監聽:同時處理主佇列與重試佇列
統一錯誤處理:processTask() 統一處理所有任務類型
自動重試:失敗時自動呼叫 enqueueRetry()
健康狀態追蹤:用於 /healthz 端點回報
在 index.js 或 src/routes/health.js 中新增:
const express = require("express");
const router = express.Router();
const { ensureConnected } = require("../lib/redis");
const mongoose = require("mongoose");
/**
* 健康檢查端點
* GET /healthz
*/
router.get("/healthz", async (req, res) => {
const health = {
status: "healthy",
timestamp: new Date().toISOString(),
checks: {},
};
try {
// 檢查 MongoDB 連線
if (mongoose.connection.readyState === 1) {
health.checks.mongodb = "ok";
} else {
health.checks.mongodb = "disconnected";
health.status = "unhealthy";
}
// 檢查 Redis 連線
try {
const redis = await ensureConnected();
await redis.ping();
health.checks.redis = "ok";
} catch (error) {
health.checks.redis = "disconnected";
health.status = "unhealthy";
}
// 檢查 Worker 狀態(可選:透過 Redis key 追蹤)
try {
const redis = await ensureConnected();
const workerHeartbeat = await redis.get("worker:heartbeat");
if (workerHeartbeat) {
const lastHeartbeat = parseInt(workerHeartbeat);
const now = Date.now();
// 如果超過 30 秒沒有心跳,視為不健康
if (now - lastHeartbeat < 30000) {
health.checks.worker = "ok";
} else {
health.checks.worker = "stale";
health.status = "degraded";
}
} else {
health.checks.worker = "unknown";
}
} catch (error) {
health.checks.worker = "error";
}
// 根據狀態回傳對應的 HTTP status code
const statusCode = health.status === "healthy" ? 200 : 503;
res.status(statusCode).json(health);
} catch (error) {
res.status(503).json({
status: "unhealthy",
error: error.message,
timestamp: new Date().toISOString(),
});
}
});
/**
* 取得佇列統計資訊
* GET /healthz/queues
*/
router.get("/healthz/queues", async (req, res) => {
try {
const redis = await ensureConnected();
const stats = {
mainQueue: await redis.lLen("queue:notify:v1"),
retryQueue: await redis.zCard("queue:retry:v1"),
dlq: await redis.lLen("queue:dlq:v1"),
timestamp: new Date().toISOString(),
};
res.json(stats);
} catch (error) {
res.status(500).json({ error: error.message });
}
});
module.exports = router;
在 index.js 中引入:
const healthRoutes = require('./src/routes/health');
app.use('/', healthRoutes);
重點說明:
多層次檢查:MongoDB、Redis、Worker 心跳
狀態分級:healthy / degraded / unhealthy
佇列統計:即時查看佇列堆積情況
標準格式:符合 Kubernetes / Docker 健康檢查規範
建立 src/utils/alerting.js:
const line = require("@line/bot-sdk");
const client = new line.messagingApi.MessagingApiClient({
channelAccessToken: process.env.CHANNEL_ACCESS_TOKEN,
});
/**
* 發送錯誤通知到 LINE 群組
* @param {string} title - 錯誤標題
* @param {Error} error - 錯誤物件
* @param {Object} context - 上下文資訊
*/
async function sendErrorAlert(title, error, context = {}) {
// 如果沒有設定 LINE_ALERT_GROUP_ID,跳過通知
if (!process.env.LINE_ALERT_GROUP_ID) {
console.log('⚠️ 未設定 LINE_ALERT_GROUP_ID,跳過錯誤通知');
return;
}
try {
const message = {
type: 'text',
text: `🚨 系統錯誤警報\n\n` +
`錯誤:${title}\n` +
`訊息:${error.message}\n` +
`時間:${new Date().toISOString()}\n` +
`\n` +
`上下文:\n${JSON.stringify(context, null, 2)}`,
};
await client.pushMessage({
to: process.env.LINE_ALERT_GROUP_ID,
messages: [message],
});
console.log('✅ 已發送錯誤通知到 LINE 群組');
} catch (alertError) {
// 避免通知系統本身的錯誤造成無限迴圈
console.error('❌ 發送錯誤通知失敗:', alertError.message);
}
}
/**
* 發送 DLQ 警報(任務進入 Dead Letter Queue)
*/
async function sendDLQAlert(task, error) {
await sendErrorAlert(
'DLQ Alert: 任務超過重試上限',
error,
{
taskType: task.type,
retryCount: task.retryCount,
orderId: task.orderId,
userId: task.userId,
}
);
}
module.exports = {
sendErrorAlert,
sendDLQAlert,
};
在 src/queues/notify.js 中整合通知:
const { sendDLQAlert } = require("../utils/alerting");
async function enqueueDLQ(payload, error) {
const redis = await ensureConnected();
const dlqTask = {
...payload,
failedAt: new Date().toISOString(),
finalError: {
message: error.message,
stack: error.stack,
},
};
await redis.rPush(DLQ_KEY, JSON.stringify(dlqTask));
console.error('💀 任務已移至 DLQ:', payload.type);
// 發送 LINE 群組通知
await sendDLQAlert(payload, error);
}
在 .env 中新增設定:
LINE_ALERT_GROUP_ID=Cxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
重點說明:
即時警報:任務進入 DLQ 時立即通知
詳細資訊:包含錯誤訊息、上下文、時間戳記
可選配置:未設定群組 ID 時不影響系統運作
避免迴圈:通知系統本身的錯誤不會再次觸發通知
在 worker.js 中暫時加入錯誤:
async function handlePaymentSuccess(payload) {
const { userId, orderId, amount, tradeNo } = payload;
// 模擬錯誤(測試用)
if (Math.random() < 0.7) { // 70% 機率失敗
throw new Error('模擬 LINE API timeout');
}
// ... 正常邏輯
}
觀察 Console 輸出:
📥 收到任務: payment_success
❌ 任務處理失敗 (payment_success): 模擬 LINE API timeout
🔄 任務加入重試佇列 (第 1 次,5000ms 後重試): payment_success
(5 秒後)
🔄 重試任務: payment_success (第 1 次)
❌ 任務處理失敗 (payment_success): 模擬 LINE API timeout
🔄 任務加入重試佇列 (第 2 次,15000ms 後重試): payment_success
(15 秒後)
🔄 重試任務: payment_success (第 2 次)
❌ 任務處理失敗 (payment_success): 模擬 LINE API timeout
🔄 任務加入重試佇列 (第 3 次,60000ms 後重試): payment_success
(60 秒後)
🔄 重試任務: payment_success (第 3 次)
❌ 任務處理失敗 (payment_success): 模擬 LINE API timeout
❌ 任務超過重試上限,移至 DLQ: payment_success
💀 任務已移至 DLQ: payment_success
✅ 已發送錯誤通知到 LINE 群組
curl http://localhost:3000/healthz
回應:
{
"status": "healthy",
"timestamp": "2025-10-13T10:30:00.000Z",
"checks": {
"mongodb": "ok",
"redis": "ok",
"worker": "ok"
}
}
查看佇列統計:
curl http://localhost:3000/healthz/queues
回應:
{
"mainQueue": 5,
"retryQueue": 2,
"dlq": 1,
"timestamp": "2025-10-13T10:30:00.000Z"
}
如果需要更專業的錯誤追蹤,可以整合 Sentry:
npm install @sentry/node
在 index.js 最前面加入:
const Sentry = require("@sentry/node");
Sentry.init({
dsn: process.env.SENTRY_DSN,
environment: process.env.NODE_ENV || 'development',
tracesSampleRate: 1.0,
});
// 在所有 middleware 之前
app.use(Sentry.Handlers.requestHandler());
// ... 其他路由設定
// 在所有路由之後
app.use(Sentry.Handlers.errorHandler());
const Sentry = require("@sentry/node");
async function processTask(task) {
try {
// ... 處理邏輯
} catch (error) {
console.error(`❌ 任務處理失敗 (${task.type}):`, error.message);
// 回報到 Sentry
Sentry.captureException(error, {
tags: {
taskType: task.type,
retryCount: task.retryCount,
},
extra: {
task,
},
});
await enqueueRetry(task, error);
}
}
在 .env 新增:
SENTRY_DSN=https://xxxxx@sentry.io/xxxxx
Sentry 的優勢:
自動錯誤分組:相同錯誤會自動聚合
Stack trace 追蹤:完整的呼叫堆疊
效能監控:API 回應時間、資料庫查詢時間
即時通知:Email / Slack / Discord 整合
建立管理端點處理 DLQ:
const express = require("express");
const router = express.Router();
const { getDLQTasks, removeDLQTask, enqueueNotify } = require("../queues/notify");
/**
* 查看 DLQ 中的所有任務
* GET /admin/dlq
*/
router.get("/admin/dlq", async (req, res) => {
try {
const tasks = await getDLQTasks();
res.json({
count: tasks.length,
tasks,
});
} catch (error) {
res.status(500).json({ error: error.message });
}
});
/**
* 重新處理 DLQ 中的特定任務
* POST /admin/dlq/retry
*/
router.post("/admin/dlq/retry", async (req, res) => {
try {
const { taskIndex } = req.body;
const tasks = await getDLQTasks();
if (taskIndex >= tasks.length) {
return res.status(404).json({ error: "任務不存在" });
}
const task = tasks[taskIndex];
// 重置 retryCount 並重新加入主佇列
delete task.retryCount;
delete task.lastError;
delete task.failedAt;
delete task.finalError;
await enqueueNotify(task);
// 從 DLQ 移除
await removeDLQTask(JSON.stringify(task));
res.json({
success: true,
message: "任務已重新加入主佇列",
task,
});
} catch (error) {
res.status(500).json({ error: error.message });
}
});
module.exports = router;
在生產環境中,建議整合以下監控工具:
監控指標:
佇列長度(main / retry / dlq)
任務處理速率
錯誤率
Worker 健康狀態
視覺化查看 Redis 資料:
npm install -g redis-commander
redis-commander
訪問 http://localhost:8081 即可查看佇列內容。
建立簡單的 HTML 頁面顯示即時統計:
<!DOCTYPE html>
<html>
<head>
<title>訂單系統監控</title>
<script>
async function loadStats() {
const res = await fetch('/healthz/queues');
const data = await res.json();
document.getElementById('main-queue').innerText = data.mainQueue;
document.getElementById('retry-queue').innerText = data.retryQueue;
document.getElementById('dlq').innerText = data.dlq;
}
setInterval(loadStats, 5000);
loadStats();
</script>
</head>
<body>
<h1>訂單系統監控</h1>
<ul>
<li>主佇列:<span id="main-queue">-</span></li>
<li>重試佇列:<span id="retry-queue">-</span></li>
<li>DLQ:<span id="dlq">-</span></li>
</ul>
</body>
</html>
可能原因:
Worker 的 setInterval 沒有正常運作
Redis sorted set 的 score 時間戳記錯誤
解決方式:
redis-cli
> ZRANGE queue:retry:v1 0 -1 WITHSCORES
檢查 score 是否小於當前時間戳記。
可能原因:
LINE API token 過期
userId 格式錯誤
網路完全斷線
解決方式:
檢查 .env 設定
查看 DLQ 中的 finalError.message
修正問題後使用 /admin/dlq/retry 重試
可能原因:
錯誤物件未釋放
Redis 連線洩漏
解決方式:
// 限制錯誤堆疊大小
const retryTask = {
...payload,
lastError: {
message: error.message,
stack: error.stack?.substring(0, 500), // 限制 500 字元
},
};
今天我們建立了一套完整的錯誤處理與監控系統:
- Main Queue:正常任務
- Retry Queue:失敗任務自動重試
- Dead Letter Queue:永久失敗任務記錄
- Exponential backoff 策略
- 重試次數上限保護
- 自動錯誤追蹤
- MongoDB / Redis 連線檢查
- Worker 心跳監控
- 佇列統計資訊
- LINE 群組即時警報
- Sentry 專業錯誤追蹤
- Console 詳細日誌
- DLQ 任務查看
- 手動重試功能
- 監控儀表板
系統的可靠性從此大幅提升,即使遇到暫時性錯誤,也能自動修復,確保通知不漏發、訂單不遺失。
明天是最後一天,我們將統整整個系列,並探討「系統上線檢查清單與未來擴充方向」,為這 30 天的實作之旅畫下完美句點!🎉